batch/debounce websocket set_props#3783
Conversation
camdecoster
left a comment
There was a problem hiding this comment.
Could you add a changelog entry? I'd also like to see a test related to the new batching mechanism.
| * Groups set_props by renderer and forwards as a single batch message. | ||
| * @param messages Array of messages | ||
| */ | ||
| private handleBatchedMessages(messages: unknown[]): void { |
There was a problem hiding this comment.
Why is messages of type unknown[]? Why not WorkerMessages[]?
| if ((msg as any).type === 'heartbeat_ack') { | ||
| continue; | ||
| } | ||
| if (msg.type === WorkerMessageType.SET_PROPS) { |
There was a problem hiding this comment.
If you use WorkerMessage[] instead of unknown[], you can get rid of the casts here.
| if (!setPropsPayloadsByRenderer.has(rendererId)) { | ||
| setPropsPayloadsByRenderer.set(rendererId, []); | ||
| } | ||
| setPropsPayloadsByRenderer.get(rendererId)!.push(setPropsMsg.payload); |
There was a problem hiding this comment.
Non null assertions are a code smell. Could this be rewritten to avoid it?
| // Forward batched set_props to each renderer | ||
| for (const [rendererId, payloads] of setPropsPayloadsByRenderer) { | ||
| const port = this.renderers.get(rendererId); | ||
| if (port) { |
There was a problem hiding this comment.
Should a message be logged if port is falsy?
There was a problem hiding this comment.
Does the message order matter? If you batch everything, the order will change. [set_prop_1, otherMessage, set_prop_2] gets sent out in the order of [set_prop_1, set_prop_2], [otherMessage].
| Args: | ||
| send_text: Async function to send text data over WebSocket | ||
| outbound_queue: janus.Queue instance for receiving messages (strings) | ||
| batch_delay: Time in seconds to wait for additional messages (default: 5ms) |
There was a problem hiding this comment.
The OP says use 0 if you want to disable batching. Maybe that should be included here? Would None be a better option? Would it be better to call say this disables debouncing rather than disabling batching?
There was a problem hiding this comment.
0 matches the same pattern as the inactivity config. Debouncing is the just the mechanism by which the batching happens, so batching is more relevant than debounce imo.
| # Wait indefinitely for first message, then use timeout for batching | ||
| timeout = batch_delay if messages else None | ||
| try: | ||
| msg = await asyncio.wait_for(q.get(), timeout=timeout) |
There was a problem hiding this comment.
Would it be more appropriate to get a message and send it immediately with send_text if batch_delay is 0?
There was a problem hiding this comment.
Maybe it saves one iteration 👍
| workerClient.onSetProps = (payload: SetPropsPayload) => { | ||
| processSetProps(payload); | ||
| }; |
There was a problem hiding this comment.
Would this also work?
| workerClient.onSetProps = (payload: SetPropsPayload) => { | |
| processSetProps(payload); | |
| }; | |
| workerClient.onSetProps = processSetProps; |
| batch_delay: Time in seconds to wait for additional messages (default: 5ms) | ||
| """ | ||
| q = outbound_queue.async_q | ||
| messages: list = [] |
There was a problem hiding this comment.
You could tighten this type if you want:
| messages: list = [] | |
| messages: list[str] = [] |
| websocket_callbacks: Optional[bool] = False, | ||
| websocket_allowed_origins: Optional[List[str]] = None, | ||
| websocket_inactivity_timeout: Optional[int] = 300000, | ||
| websocket_batch_delay: Optional[float] = 0.005, |
There was a problem hiding this comment.
What happens if a user explicitly passes in None for this value?
There was a problem hiding this comment.
It doesn't pass typing? Optional[float] requires a float and has float as default.
When you use set_props in a loop inside a websocket callback, it would create single updates in the renderer and can result in lag.
This PR add a batching and debounce mechanism to set_props, default to 5ms/200hz configurable with